{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Text detection using Amazon Rekognition"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This notebook provides a walkthrough of the [text detection API](https://docs.aws.amazon.com/rekognition/latest/dg/text-detection.html) in Amazon Rekognition. You can quickly identify text in your video and image libraries to catalog footage and photos for marketing, advertising, and media industry use cases."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import boto3\n",
    "from IPython.display import HTML, display, Image as IImage\n",
    "from PIL import Image, ImageDraw, ImageFont\n",
    "import time\n",
    "import os"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import sagemaker\n",
    "import boto3\n",
    "\n",
    "sagemaker_session = sagemaker.Session()\n",
    "role = sagemaker.get_execution_role()\n",
    "bucket = sagemaker_session.default_bucket()\n",
    "region = boto3.Session().region_name"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "rekognition = boto3.client(\"rekognition\")\n",
    "s3 = boto3.client(\"s3\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!mkdir -p ./tmp\n",
    "temp_folder = \"tmp/\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Detect text in image"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "imageName = \"content-moderation/media/coffee.jpg\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "display(IImage(url=s3.generate_presigned_url(\"get_object\", Params={\"Bucket\": bucket, \"Key\": imageName})))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Call Rekognition to Detect Text in the Image\n",
    "https://docs.aws.amazon.com/rekognition/latest/dg/API_DetectText.html\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "detectTextResponse = rekognition.detect_text(\n",
    "    Image={\n",
    "        \"S3Object\": {\n",
    "            \"Bucket\": bucket,\n",
    "            \"Name\": imageName,\n",
    "        }\n",
    "    },\n",
    "    Filters={\"WordFilter\": {\"MinConfidence\": 90}},\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Review the raw JSON reponse from Rekognition\n",
    "In the JSON response below, you will see detected text, confidence score, and additional information."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "display(detectTextResponse)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Display list of detected unsafe text"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import string\n",
    "\n",
    "unsafeWords = [\"crap\", \"darn\", \"damm\"]\n",
    "for textDetection in detectTextResponse[\"TextDetections\"]:\n",
    "    # strip punctuation before checking match\n",
    "    text = textDetection[\"DetectedText\"].translate(str.maketrans(\"\", \"\", string.punctuation))\n",
    "    if textDetection[\"Type\"] == \"WORD\" and text in unsafeWords:\n",
    "        print(\"Detected unsafe word: {}\".format(textDetection[\"DetectedText\"]))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Show Bounding Boxes Around Detected Objects"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def drawBoundingBoxes(sourceImage, boxes):\n",
    "    # blue, green, red, grey\n",
    "    colors = ((255, 255, 255), (255, 255, 255), (76, 182, 252), (52, 194, 123))\n",
    "\n",
    "    # Download image locally\n",
    "    imageLocation = temp_folder + os.path.basename(sourceImage)\n",
    "    s3.download_file(bucket, sourceImage, imageLocation)\n",
    "\n",
    "    # Draws BB on Image\n",
    "    bbImage = Image.open(imageLocation)\n",
    "    draw = ImageDraw.Draw(bbImage)\n",
    "    width, height = bbImage.size\n",
    "    col = 0\n",
    "    maxcol = len(colors)\n",
    "    line = 3\n",
    "    for box in boxes:\n",
    "        x1 = int(box[1][\"Left\"] * width)\n",
    "        y1 = int(box[1][\"Top\"] * height)\n",
    "        x2 = int(box[1][\"Left\"] * width + box[1][\"Width\"] * width)\n",
    "        y2 = int(box[1][\"Top\"] * height + box[1][\"Height\"] * height)\n",
    "\n",
    "        draw.text((x1, y1), box[0], colors[col])\n",
    "        for l in range(line):\n",
    "            draw.rectangle((x1 - l, y1 - l, x2 + l, y2 + l), outline=colors[col])\n",
    "        col = (col + 1) % maxcol\n",
    "\n",
    "    imageFormat = \"PNG\"\n",
    "    ext = sourceImage.lower()\n",
    "    if ext.endswith(\"jpg\") or ext.endswith(\"jpeg\"):\n",
    "        imageFormat = \"JPEG\"\n",
    "\n",
    "    bbImage.save(imageLocation, format=imageFormat)\n",
    "\n",
    "    display(bbImage)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "boxes = []\n",
    "textDetections = detectTextResponse[\"TextDetections\"]\n",
    "for textDetection in textDetections:\n",
    "    boxes.append((textDetection[\"Type\"], textDetection[\"Geometry\"][\"BoundingBox\"]))\n",
    "\n",
    "drawBoundingBoxes(imageName, boxes)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Detect Text in Image using Filters and Regions of Interest"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "imageName = \"content-moderation/media/coffee.jpg\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "display(IImage(url=s3.generate_presigned_url(\"get_object\", Params={\"Bucket\": bucket, \"Key\": imageName})))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Call Amazon Rekognition to detect text in the image\n",
    "# https://docs.aws.amazon.com/rekognition/latest/dg/API_DetectText.html\n",
    "\n",
    "detectTextResponse = rekognition.detect_text(\n",
    "    Image={\n",
    "        \"S3Object\": {\n",
    "            \"Bucket\": bucket,\n",
    "            \"Name\": imageName,\n",
    "        }\n",
    "    },\n",
    "    Filters={\n",
    "        \"WordFilter\": {\"MinConfidence\": 90, \"MinBoundingBoxHeight\": 0.05, \"MinBoundingBoxWidth\": 0.02},\n",
    "        \"RegionsOfInterest\": [\n",
    "            {\"BoundingBox\": {\"Width\": 0.1, \"Height\": 0.05, \"Left\": 0.01, \"Top\": 0.01}},\n",
    "        ],\n",
    "    },\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Review JSON Response from Rekognition Text API (Text Detection)\n",
    "In the JSON response below, you will see detected text, confidence score, and additional information.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "display(detectTextResponse)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "for textDetection in detectTextResponse[\"TextDetections\"]:\n",
    "    text = textDetection[\"DetectedText\"]\n",
    "    if textDetection[\"Type\"] == \"WORD\":\n",
    "        print(\"Word: {}\".format(textDetection[\"DetectedText\"]))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Detect Text in Video\n",
    " Text detection in video is an async operation. \n",
    "https://docs.aws.amazon.com/rekognition/latest/dg/text-detecting-video-procedure.html.\n",
    "\n",
    "- First we start a text detection job which returns a Job Id.\n",
    "- We can then call `get_text_detection` to get the job status and after job is complete, we can get object metadata.\n",
    "- In production use cases, you would usually use StepFunction or SNS topic to get notified when job is complete."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "videoName = \"content-moderation/media/serverless-bytes.mov\"\n",
    "\n",
    "strDetail = \"Text detected in video<br>=======================================<br>\"\n",
    "strOverall = \"Text in the overall video:<br>=======================================<br>\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "s3VideoUrl = s3.generate_presigned_url(\"get_object\", Params={\"Bucket\": bucket, \"Key\": videoName})\n",
    "\n",
    "videoTag = \"<video controls='controls' autoplay width='640' height='360' name='Video' src='{0}'></video>\".format(\n",
    "    s3VideoUrl\n",
    ")\n",
    "\n",
    "videoui = \"<table><tr><td style='vertical-align: top'>{}</td></tr></table>\".format(videoTag)\n",
    "\n",
    "display(HTML(videoui))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Call Rekognition to start a job for text detection"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "startTextDetection = rekognition.start_text_detection(\n",
    "    Video={\n",
    "        \"S3Object\": {\n",
    "            \"Bucket\": bucket,\n",
    "            \"Name\": videoName,\n",
    "        }\n",
    "    },\n",
    ")\n",
    "\n",
    "textJobId = startTextDetection[\"JobId\"]\n",
    "display(\"Job Id: {0}\".format(textJobId))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Wait for Text Detection Job to Complete\n",
    "In production use cases, you would usually use StepFunction or SNS topic to get notified when job is complete.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "getTextDetection = rekognition.get_text_detection(JobId=textJobId)\n",
    "\n",
    "while getTextDetection[\"JobStatus\"] == \"IN_PROGRESS\":\n",
    "    time.sleep(5)\n",
    "    print(\".\", end=\"\")\n",
    "\n",
    "    getTextDetection = rekognition.get_text_detection(JobId=textJobId)\n",
    "\n",
    "display(getTextDetection[\"JobStatus\"])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Review Raw JSON Response from Rekognition Text Detection API\n",
    "In the JSON response below, you will see list of detected text.\n",
    "\n",
    "For each detected object, you will see information like `Timestamp`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "display(getTextDetection)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Display Recognized Text in the Video"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "flaggedTextInVideo = [\"AWS\", \"Twitter\"]\n",
    "\n",
    "theLines = {}\n",
    "\n",
    "# Objects detected in each frame\n",
    "for obj in getTextDetection[\"TextDetections\"]:\n",
    "    if obj[\"TextDetection\"][\"Type\"] == \"WORD\":\n",
    "        ts = obj[\"Timestamp\"]\n",
    "        cconfidence = obj[\"TextDetection\"][\"Confidence\"]\n",
    "        oname = obj[\"TextDetection\"][\"DetectedText\"]\n",
    "\n",
    "        if oname in flaggedTextInVideo:\n",
    "            print(\"Found flagged text at {} ms: {} (Confidence: {})\".format(ts, oname, round(cconfidence, 2)))\n",
    "\n",
    "        strDetail = strDetail + \"At {} ms: {} (Confidence: {})<br>\".format(ts, oname, round(cconfidence, 2))\n",
    "        if oname in theLines:\n",
    "            cojb = theLines[oname]\n",
    "            theLines[oname] = {\"Text\": oname, \"Count\": 1 + cojb[\"Count\"]}\n",
    "        else:\n",
    "            theLines[oname] = {\"Text\": oname, \"Count\": 1}\n",
    "\n",
    "# Unique objects detected in video\n",
    "for theLine in theLines:\n",
    "    strOverall = strOverall + \"Name: {}, Count: {}<br>\".format(theLine, theLines[theLine][\"Count\"])\n",
    "\n",
    "# Display results\n",
    "display(HTML(strOverall))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "listui = \"<table><tr><td style='vertical-align: top'>{}</td></tr></table>\".format(strDetail)\n",
    "display(HTML(listui))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# References\n",
    "- https://docs.aws.amazon.com/rekognition/latest/dg/API_DetectText.html\n",
    "- https://docs.aws.amazon.com/rekognition/latest/dg/API_StartTextDetection.html\n",
    "- https://docs.aws.amazon.com/rekognition/latest/dg/API_GetTextDetection.html"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Congratulations!\n",
    "You have successfully used Amazon Rekognition to identify text in images an videos."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Release Resources"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%html\n",
    "\n",
    "<p><b>Shutting down your kernel for this notebook to release resources.</b></p>\n",
    "<button class=\"sm-command-button\" data-commandlinker-command=\"kernelmenu:shutdown\" style=\"display:none;\">Shutdown Kernel</button>\n",
    "        \n",
    "<script>\n",
    "try {\n",
    "    els = document.getElementsByClassName(\"sm-command-button\");\n",
    "    els[0].click();\n",
    "}\n",
    "catch(err) {\n",
    "    // NoOp\n",
    "}    \n",
    "</script>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%javascript\n",
    "\n",
    "try {\n",
    "    Jupyter.notebook.save_checkpoint();\n",
    "    Jupyter.notebook.session.delete();\n",
    "}\n",
    "catch(err) {\n",
    "    // NoOp\n",
    "}"
   ]
  }
 ],
 "metadata": {
  "instance_type": "ml.t3.medium",
  "kernelspec": {
   "display_name": "Python 3 (Data Science)",
   "language": "python",
   "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
